/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.trogdor.agent;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.TextNode;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.TreeMap;
import org.apache.kafka.common.utils.MockScheduler;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Scheduler;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.trogdor.agent.Agent;
import org.apache.kafka.trogdor.agent.AgentClient;
import org.apache.kafka.trogdor.agent.AgentRestResource;
import org.apache.kafka.trogdor.basic.BasicNode;
import org.apache.kafka.trogdor.basic.BasicPlatform;
import org.apache.kafka.trogdor.basic.BasicTopology;
import org.apache.kafka.trogdor.common.ExpectedTasks;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.fault.FilesUnreadableFaultSpec;
import org.apache.kafka.trogdor.fault.Kibosh;
import org.apache.kafka.trogdor.rest.AgentStatusResponse;
import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
import org.apache.kafka.trogdor.rest.DestroyWorkerRequest;
import org.apache.kafka.trogdor.rest.JsonRestServer;
import org.apache.kafka.trogdor.rest.RequestConflictException;
import org.apache.kafka.trogdor.rest.StopWorkerRequest;
import org.apache.kafka.trogdor.rest.TaskDone;
import org.apache.kafka.trogdor.rest.TaskState;
import org.apache.kafka.trogdor.rest.UptimeResponse;
import org.apache.kafka.trogdor.rest.WorkerDone;
import org.apache.kafka.trogdor.rest.WorkerRunning;
import org.apache.kafka.trogdor.rest.WorkerState;
import org.apache.kafka.trogdor.task.NoOpTaskSpec;
import org.apache.kafka.trogdor.task.SampleTaskSpec;
import org.apache.kafka.trogdor.task.TaskSpec;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

@Timeout(value=120L)
public class AgentTest {
    private Agent agent;

    private static BasicPlatform createBasicPlatform(Scheduler scheduler) {
        TreeMap<String, BasicNode> nodes = new TreeMap<String, BasicNode>();
        HashMap<String, String> config = new HashMap<String, String>();
        config.put("trogdor.agent.port", Integer.toString(8888));
        nodes.put("node01", new BasicNode("node01", "localhost", config, Collections.emptySet()));
        BasicTopology topology = new BasicTopology(nodes);
        return new BasicPlatform("node01", topology, scheduler, (BasicPlatform.CommandRunner)new BasicPlatform.ShellCommandRunner());
    }

    private Agent createAgent(Scheduler scheduler) {
        JsonRestServer restServer = new JsonRestServer(0);
        AgentRestResource resource = new AgentRestResource();
        restServer.start(new Object[]{resource});
        this.agent = new Agent((Platform)AgentTest.createBasicPlatform(scheduler), scheduler, restServer, resource);
        return this.agent;
    }

    @AfterEach
    public void cleanUp() throws Exception {
        if (this.agent != null) {
            this.agent.beginShutdown();
            this.agent.waitForShutdown();
        }
    }

    @Test
    public void testAgentStartShutdown() throws Exception {
        Agent agent = this.createAgent(Scheduler.SYSTEM);
        agent.beginShutdown();
        agent.waitForShutdown();
        this.agent = null;
    }

    @Test
    public void testAgentProgrammaticShutdown() throws Exception {
        Agent agent = this.createAgent(Scheduler.SYSTEM);
        AgentClient client = new AgentClient.Builder().maxTries(10).target("localhost", agent.port()).build();
        client.invokeShutdown();
        agent.waitForShutdown();
        this.agent = null;
    }

    @Test
    public void testAgentGetStatus() throws Exception {
        Agent agent = this.createAgent(Scheduler.SYSTEM);
        AgentClient client = new AgentClient.Builder().maxTries(10).target("localhost", agent.port()).build();
        AgentStatusResponse status = client.status();
        Assertions.assertEquals((Object)agent.status(), (Object)status);
    }

    @Test
    public void testCreateExpiredWorkerIsNotScheduled() throws Exception {
        long initialTimeMs = 100L;
        long tickMs = 15L;
        final boolean[] toSleep = new boolean[]{true};
        MockTime time = new MockTime(tickMs, initialTimeMs, 0L){

            public void sleep(long ms) {
                boolean bl = toSleep[0] = !toSleep[0];
                if (toSleep[0]) {
                    super.sleep(ms);
                }
            }
        };
        MockScheduler scheduler = new MockScheduler(time);
        Agent agent = this.createAgent((Scheduler)scheduler);
        AgentClient client = new AgentClient.Builder().maxTries(10).target("localhost", agent.port()).build();
        AgentStatusResponse status = client.status();
        Assertions.assertEquals(Collections.emptyMap(), (Object)status.workers());
        new ExpectedTasks().waitFor(client);
        NoOpTaskSpec fooSpec = new NoOpTaskSpec(10L, 10L);
        client.createWorker(new CreateWorkerRequest(0L, "foo", (TaskSpec)fooSpec));
        long actualStartTimeMs = initialTimeMs + tickMs;
        long doneMs = actualStartTimeMs + 2L * tickMs;
        new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").workerState((WorkerState)new WorkerDone("foo", (TaskSpec)fooSpec, actualStartTimeMs, doneMs, null, "worker expired")).taskState((TaskState)new TaskDone((TaskSpec)fooSpec, actualStartTimeMs, doneMs, "worker expired", false, null)).build()).waitFor(client);
    }

    @Test
    public void testAgentGetUptime() throws Exception {
        MockTime time = new MockTime(0L, 111L, 0L);
        MockScheduler scheduler = new MockScheduler(time);
        Agent agent = this.createAgent((Scheduler)scheduler);
        AgentClient client = new AgentClient.Builder().maxTries(10).target("localhost", agent.port()).build();
        UptimeResponse uptime = client.uptime();
        Assertions.assertEquals((Object)agent.uptime(), (Object)uptime);
        time.setCurrentTimeMs(150L);
        Assertions.assertNotEquals((Object)agent.uptime(), (Object)uptime);
    }

    @Test
    public void testAgentCreateWorkers() throws Exception {
        MockTime time = new MockTime(0L, 0L, 0L);
        MockScheduler scheduler = new MockScheduler(time);
        Agent agent = this.createAgent((Scheduler)scheduler);
        AgentClient client = new AgentClient.Builder().maxTries(10).target("localhost", agent.port()).build();
        AgentStatusResponse status = client.status();
        Assertions.assertEquals(Collections.emptyMap(), (Object)status.workers());
        new ExpectedTasks().waitFor(client);
        NoOpTaskSpec fooSpec = new NoOpTaskSpec(1000L, 600000L);
        client.createWorker(new CreateWorkerRequest(0L, "foo", (TaskSpec)fooSpec));
        new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").workerState((WorkerState)new WorkerRunning("foo", (TaskSpec)fooSpec, 0L, (JsonNode)new TextNode("active"))).build()).waitFor(client);
        NoOpTaskSpec barSpec = new NoOpTaskSpec(2000L, 900000L);
        client.createWorker(new CreateWorkerRequest(1L, "bar", (TaskSpec)barSpec));
        client.createWorker(new CreateWorkerRequest(1L, "bar", (TaskSpec)barSpec));
        Assertions.assertThrows(RequestConflictException.class, () -> client.createWorker(new CreateWorkerRequest(1L, "foo", (TaskSpec)barSpec)), (String)"Recreating a request with a different taskId is not allowed");
        Assertions.assertThrows(RequestConflictException.class, () -> client.createWorker(new CreateWorkerRequest(1L, "bar", (TaskSpec)fooSpec)), (String)"Recreating a request with a different spec is not allowed");
        new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").workerState((WorkerState)new WorkerRunning("foo", (TaskSpec)fooSpec, 0L, (JsonNode)new TextNode("active"))).build()).addTask(new ExpectedTasks.ExpectedTaskBuilder("bar").workerState((WorkerState)new WorkerRunning("bar", (TaskSpec)barSpec, 0L, (JsonNode)new TextNode("active"))).build()).waitFor(client);
        NoOpTaskSpec bazSpec = new NoOpTaskSpec(1L, 450000L);
        client.createWorker(new CreateWorkerRequest(2L, "baz", (TaskSpec)bazSpec));
        new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").workerState((WorkerState)new WorkerRunning("foo", (TaskSpec)fooSpec, 0L, (JsonNode)new TextNode("active"))).build()).addTask(new ExpectedTasks.ExpectedTaskBuilder("bar").workerState((WorkerState)new WorkerRunning("bar", (TaskSpec)barSpec, 0L, (JsonNode)new TextNode("active"))).build()).addTask(new ExpectedTasks.ExpectedTaskBuilder("baz").workerState((WorkerState)new WorkerRunning("baz", (TaskSpec)bazSpec, 0L, (JsonNode)new TextNode("active"))).build()).waitFor(client);
    }

    @Test
    public void testAgentFinishesTasks() throws Exception {
        long startTimeMs = 2000L;
        MockTime time = new MockTime(0L, startTimeMs, 0L);
        MockScheduler scheduler = new MockScheduler(time);
        Agent agent = this.createAgent((Scheduler)scheduler);
        AgentClient client = new AgentClient.Builder().maxTries(10).target("localhost", agent.port()).build();
        new ExpectedTasks().waitFor(client);
        NoOpTaskSpec fooSpec = new NoOpTaskSpec(startTimeMs, 2L);
        long fooSpecStartTimeMs = startTimeMs;
        client.createWorker(new CreateWorkerRequest(0L, "foo", (TaskSpec)fooSpec));
        new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").workerState((WorkerState)new WorkerRunning("foo", (TaskSpec)fooSpec, startTimeMs, (JsonNode)new TextNode("active"))).build()).waitFor(client);
        time.sleep(1L);
        long barSpecWorkerId = 1L;
        long barSpecStartTimeMs = startTimeMs + 1L;
        NoOpTaskSpec barSpec = new NoOpTaskSpec(startTimeMs, 900000L);
        client.createWorker(new CreateWorkerRequest(barSpecWorkerId, "bar", (TaskSpec)barSpec));
        new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").workerState((WorkerState)new WorkerRunning("foo", (TaskSpec)fooSpec, fooSpecStartTimeMs, (JsonNode)new TextNode("active"))).build()).addTask(new ExpectedTasks.ExpectedTaskBuilder("bar").workerState((WorkerState)new WorkerRunning("bar", (TaskSpec)barSpec, barSpecStartTimeMs, (JsonNode)new TextNode("active"))).build()).waitFor(client);
        time.sleep(1L);
        new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").workerState((WorkerState)new WorkerDone("foo", (TaskSpec)fooSpec, fooSpecStartTimeMs, fooSpecStartTimeMs + 2L, (JsonNode)new TextNode("done"), "")).build()).addTask(new ExpectedTasks.ExpectedTaskBuilder("bar").workerState((WorkerState)new WorkerRunning("bar", (TaskSpec)barSpec, barSpecStartTimeMs, (JsonNode)new TextNode("active"))).build()).waitFor(client);
        time.sleep(5L);
        client.stopWorker(new StopWorkerRequest(barSpecWorkerId));
        new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").workerState((WorkerState)new WorkerDone("foo", (TaskSpec)fooSpec, fooSpecStartTimeMs, fooSpecStartTimeMs + 2L, (JsonNode)new TextNode("done"), "")).build()).addTask(new ExpectedTasks.ExpectedTaskBuilder("bar").workerState((WorkerState)new WorkerDone("bar", (TaskSpec)barSpec, barSpecStartTimeMs, startTimeMs + 7L, (JsonNode)new TextNode("done"), "")).build()).waitFor(client);
    }

    @Test
    public void testWorkerCompletions() throws Exception {
        MockTime time = new MockTime(0L, 0L, 0L);
        MockScheduler scheduler = new MockScheduler(time);
        Agent agent = this.createAgent((Scheduler)scheduler);
        AgentClient client = new AgentClient.Builder().maxTries(10).target("localhost", agent.port()).build();
        new ExpectedTasks().waitFor(client);
        SampleTaskSpec fooSpec = new SampleTaskSpec(0L, 900000L, Collections.singletonMap("node01", 1L), "");
        client.createWorker(new CreateWorkerRequest(0L, "foo", (TaskSpec)fooSpec));
        new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").workerState((WorkerState)new WorkerRunning("foo", (TaskSpec)fooSpec, 0L, (JsonNode)new TextNode("active"))).build()).waitFor(client);
        SampleTaskSpec barSpec = new SampleTaskSpec(0L, 900000L, Collections.singletonMap("node01", 2L), "baz");
        client.createWorker(new CreateWorkerRequest(1L, "bar", (TaskSpec)barSpec));
        time.sleep(1L);
        new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").workerState((WorkerState)new WorkerDone("foo", (TaskSpec)fooSpec, 0L, 1L, (JsonNode)new TextNode("halted"), "")).build()).addTask(new ExpectedTasks.ExpectedTaskBuilder("bar").workerState((WorkerState)new WorkerRunning("bar", (TaskSpec)barSpec, 0L, (JsonNode)new TextNode("active"))).build()).waitFor(client);
        time.sleep(1L);
        new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").workerState((WorkerState)new WorkerDone("foo", (TaskSpec)fooSpec, 0L, 1L, (JsonNode)new TextNode("halted"), "")).build()).addTask(new ExpectedTasks.ExpectedTaskBuilder("bar").workerState((WorkerState)new WorkerDone("bar", (TaskSpec)barSpec, 0L, 2L, (JsonNode)new TextNode("halted"), "baz")).build()).waitFor(client);
    }

    @Test
    public void testKiboshFaults() throws Exception {
        MockTime time = new MockTime(0L, 0L, 0L);
        MockScheduler scheduler = new MockScheduler(time);
        Agent agent = this.createAgent((Scheduler)scheduler);
        AgentClient client = new AgentClient.Builder().maxTries(10).target("localhost", agent.port()).build();
        new ExpectedTasks().waitFor(client);
        try (MockKibosh mockKibosh = new MockKibosh();){
            Assertions.assertEquals((Object)Kibosh.KiboshControlFile.EMPTY, (Object)mockKibosh.read());
            FilesUnreadableFaultSpec fooSpec = new FilesUnreadableFaultSpec(0L, 900000L, Collections.singleton("myAgent"), mockKibosh.tempDir.getPath(), "/foo", 123);
            client.createWorker(new CreateWorkerRequest(0L, "foo", (TaskSpec)fooSpec));
            new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").workerState((WorkerState)new WorkerRunning("foo", (TaskSpec)fooSpec, 0L, (JsonNode)new TextNode("Added fault foo"))).build()).waitFor(client);
            Assertions.assertEquals((Object)new Kibosh.KiboshControlFile(Collections.singletonList(new Kibosh.KiboshFilesUnreadableFaultSpec("/foo", 123))), (Object)mockKibosh.read());
            FilesUnreadableFaultSpec barSpec = new FilesUnreadableFaultSpec(0L, 900000L, Collections.singleton("myAgent"), mockKibosh.tempDir.getPath(), "/bar", 456);
            client.createWorker(new CreateWorkerRequest(1L, "bar", (TaskSpec)barSpec));
            new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").workerState((WorkerState)new WorkerRunning("foo", (TaskSpec)fooSpec, 0L, (JsonNode)new TextNode("Added fault foo"))).build()).addTask(new ExpectedTasks.ExpectedTaskBuilder("bar").workerState((WorkerState)new WorkerRunning("bar", (TaskSpec)barSpec, 0L, (JsonNode)new TextNode("Added fault bar"))).build()).waitFor(client);
            Assertions.assertEquals((Object)new Kibosh.KiboshControlFile(Arrays.asList(new Kibosh.KiboshFilesUnreadableFaultSpec("/foo", 123), new Kibosh.KiboshFilesUnreadableFaultSpec("/bar", 456))), (Object)mockKibosh.read());
            time.sleep(1L);
            client.stopWorker(new StopWorkerRequest(0L));
            new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").workerState((WorkerState)new WorkerDone("foo", (TaskSpec)fooSpec, 0L, 1L, (JsonNode)new TextNode("Removed fault foo"), "")).build()).addTask(new ExpectedTasks.ExpectedTaskBuilder("bar").workerState((WorkerState)new WorkerRunning("bar", (TaskSpec)barSpec, 0L, (JsonNode)new TextNode("Added fault bar"))).build()).waitFor(client);
            Assertions.assertEquals((Object)new Kibosh.KiboshControlFile(Collections.singletonList(new Kibosh.KiboshFilesUnreadableFaultSpec("/bar", 456))), (Object)mockKibosh.read());
        }
    }

    @Test
    public void testDestroyWorkers() throws Exception {
        MockTime time = new MockTime(0L, 0L, 0L);
        MockScheduler scheduler = new MockScheduler(time);
        Agent agent = this.createAgent((Scheduler)scheduler);
        AgentClient client = new AgentClient.Builder().maxTries(10).target("localhost", agent.port()).build();
        new ExpectedTasks().waitFor(client);
        NoOpTaskSpec fooSpec = new NoOpTaskSpec(0L, 5L);
        client.createWorker(new CreateWorkerRequest(0L, "foo", (TaskSpec)fooSpec));
        new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").workerState((WorkerState)new WorkerRunning("foo", (TaskSpec)fooSpec, 0L, (JsonNode)new TextNode("active"))).build()).waitFor(client);
        time.sleep(1L);
        client.destroyWorker(new DestroyWorkerRequest(0L));
        client.destroyWorker(new DestroyWorkerRequest(0L));
        client.destroyWorker(new DestroyWorkerRequest(1L));
        new ExpectedTasks().waitFor(client);
        time.sleep(1L);
        NoOpTaskSpec fooSpec2 = new NoOpTaskSpec(2L, 1L);
        client.createWorker(new CreateWorkerRequest(1L, "foo", (TaskSpec)fooSpec2));
        new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").workerState((WorkerState)new WorkerRunning("foo", (TaskSpec)fooSpec2, 2L, (JsonNode)new TextNode("active"))).build()).waitFor(client);
        time.sleep(2L);
        new ExpectedTasks().addTask(new ExpectedTasks.ExpectedTaskBuilder("foo").workerState((WorkerState)new WorkerDone("foo", (TaskSpec)fooSpec2, 2L, 4L, (JsonNode)new TextNode("done"), "")).build()).waitFor(client);
        time.sleep(1L);
        client.destroyWorker(new DestroyWorkerRequest(1L));
        new ExpectedTasks().waitFor(client);
    }

    static void testExec(Agent agent, String expected, boolean expectedReturn, TaskSpec spec) throws Exception {
        ByteArrayOutputStream b = new ByteArrayOutputStream();
        PrintStream p = new PrintStream((OutputStream)b, true, StandardCharsets.UTF_8.toString());
        boolean actualReturn = agent.exec(spec, p);
        Assertions.assertEquals((Object)expected, (Object)b.toString());
        Assertions.assertEquals((Object)expectedReturn, (Object)actualReturn);
    }

    @Test
    public void testAgentExecWithTimeout() throws Exception {
        Agent agent = this.createAgent(Scheduler.SYSTEM);
        NoOpTaskSpec spec = new NoOpTaskSpec(0L, 1L);
        TaskSpec rebasedSpec = agent.rebaseTaskSpecTime((TaskSpec)spec);
        AgentTest.testExec(agent, String.format("Waiting for completion of task:%s%n", JsonUtil.toPrettyJsonString((Object)rebasedSpec)) + String.format("Task failed with status null and error worker expired%n", new Object[0]), false, rebasedSpec);
    }

    @Test
    public void testAgentExecWithNormalExit() throws Exception {
        Agent agent = this.createAgent(Scheduler.SYSTEM);
        SampleTaskSpec spec = new SampleTaskSpec(0L, 120000L, Collections.singletonMap("node01", 1L), "");
        TaskSpec rebasedSpec = agent.rebaseTaskSpecTime((TaskSpec)spec);
        AgentTest.testExec(agent, String.format("Waiting for completion of task:%s%n", JsonUtil.toPrettyJsonString((Object)rebasedSpec)) + String.format("Task succeeded with status \"halted\"%n", new Object[0]), true, rebasedSpec);
    }

    private static class MockKibosh
    implements AutoCloseable {
        private final File tempDir = TestUtils.tempDirectory();
        private final Path controlFile = Paths.get(this.tempDir.toPath().toString(), "kibosh_control");

        MockKibosh() throws IOException {
            Kibosh.KiboshControlFile.EMPTY.write(this.controlFile);
        }

        Kibosh.KiboshControlFile read() throws IOException {
            return Kibosh.KiboshControlFile.read((Path)this.controlFile);
        }

        @Override
        public void close() throws Exception {
            Utils.delete((File)this.tempDir);
        }
    }
}

