package org.apache.kafka.trogdor.agent;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.fault.Fault;
import org.apache.kafka.trogdor.fault.FaultSet;
import org.apache.kafka.trogdor.fault.FaultSpec;
import org.apache.kafka.trogdor.fault.RunningState;
import org.apache.kafka.trogdor.rest.AgentFaultsResponse;
import org.apache.kafka.trogdor.rest.CreateAgentFaultRequest;
import org.apache.kafka.trogdor.rest.FaultDataMap;
import org.apache.kafka.trogdor.rest.JsonRestServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/trogdor/agent/Agent.class */
public final class Agent {
    private static final Logger log = LoggerFactory.getLogger(Agent.class);
    private final Time time;
    private final long startTimeMs;
    private final Platform platform;
    private final JsonRestServer restServer;
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition cond = this.lock.newCondition();
    private final FaultSet pendingFaults = new FaultSet();
    private final FaultSet runningFaults = new FaultSet();
    private final FaultSet doneFaults = new FaultSet();
    private boolean shutdown = false;
    private final AgentRunnable runnable = new AgentRunnable();
    private final KafkaThread thread = new KafkaThread("TrogdorAgentThread", this.runnable, false);

    /* loaded from: input_file:org/apache/kafka/trogdor/agent/Agent$AgentRunnable.class */
    class AgentRunnable implements Runnable {
        AgentRunnable() {
        }

        /* JADX WARN: Finally extract failed */
        @Override // java.lang.Runnable
        public void run() {
            Agent.log.info("Starting main service thread.");
            while (true) {
                try {
                    try {
                        ArrayList<Fault> arrayList = new ArrayList();
                        ArrayList arrayList2 = new ArrayList();
                        ArrayList<Fault> arrayList3 = new ArrayList();
                        ArrayList arrayList4 = new ArrayList();
                        long milliseconds = Agent.this.time.milliseconds();
                        long j = milliseconds + 3600000;
                        Agent.this.lock.lock();
                        try {
                            FaultSet.FaultSetIterator iterateByStart = Agent.this.pendingFaults.iterateByStart();
                            while (iterateByStart.hasNext()) {
                                Fault next = iterateByStart.next();
                                arrayList.add(next);
                                j = Math.min(j, next.spec().startMs() + next.spec().durationMs());
                                iterateByStart.remove();
                            }
                            FaultSet.FaultSetIterator iterateByEnd = Agent.this.runningFaults.iterateByEnd();
                            while (true) {
                                if (!iterateByEnd.hasNext()) {
                                    break;
                                }
                                Fault next2 = iterateByEnd.next();
                                long startedMs = ((RunningState) next2.state()).startedMs() + next2.spec().durationMs();
                                if (milliseconds < startedMs) {
                                    j = Math.min(j, startedMs);
                                    break;
                                } else {
                                    arrayList3.add(next2);
                                    iterateByEnd.remove();
                                }
                            }
                            Agent.this.lock.unlock();
                            for (Fault fault : arrayList) {
                                try {
                                    Agent.log.debug("Activating fault " + fault);
                                    fault.activate(milliseconds, Agent.this.platform);
                                    arrayList2.add(fault);
                                } catch (Throwable th) {
                                    Agent.log.error("Error activating fault " + fault.id(), th);
                                    arrayList4.add(fault);
                                }
                            }
                            for (Fault fault2 : arrayList3) {
                                try {
                                    try {
                                        Agent.log.debug("Deactivating fault " + fault2);
                                        fault2.deactivate(milliseconds, Agent.this.platform);
                                        arrayList4.add(fault2);
                                    } catch (Throwable th2) {
                                        arrayList4.add(fault2);
                                        throw th2;
                                    }
                                } catch (Throwable th3) {
                                    Agent.log.error("Error deactivating fault " + fault2.id(), th3);
                                    arrayList4.add(fault2);
                                }
                            }
                            Agent.this.lock.lock();
                            try {
                                Iterator it = arrayList2.iterator();
                                while (it.hasNext()) {
                                    Agent.this.runningFaults.add((Fault) it.next());
                                }
                                Iterator it2 = arrayList4.iterator();
                                while (it2.hasNext()) {
                                    Agent.this.doneFaults.add((Fault) it2.next());
                                }
                                if (Agent.this.shutdown) {
                                    Agent.this.lock.unlock();
                                    Agent.log.info("AgentRunnable shutting down.");
                                    Agent.this.restServer.stop();
                                    Agent.log.info("AgentRunnable deactivated {} fault(s).", Integer.valueOf(Agent.this.deactivateRunningFaults()));
                                    return;
                                }
                                if (j > milliseconds) {
                                    Agent.log.trace("Sleeping for {} ms", Long.valueOf(j - milliseconds));
                                    if (Agent.this.cond.await(j - milliseconds, TimeUnit.MILLISECONDS)) {
                                        Agent.log.trace("AgentRunnable woke up early");
                                    }
                                }
                                if (Agent.this.shutdown) {
                                    Agent.this.lock.unlock();
                                    Agent.log.info("AgentRunnable shutting down.");
                                    Agent.this.restServer.stop();
                                    Agent.log.info("AgentRunnable deactivated {} fault(s).", Integer.valueOf(Agent.this.deactivateRunningFaults()));
                                    return;
                                }
                                Agent.this.lock.unlock();
                            } finally {
                                Agent.this.lock.unlock();
                            }
                        } catch (Throwable th4) {
                            throw th4;
                        }
                    } catch (Throwable th5) {
                        Agent.log.error("Unhandled exception in AgentRunnable", th5);
                        Agent.log.info("AgentRunnable shutting down.");
                        Agent.this.restServer.stop();
                        Agent.log.info("AgentRunnable deactivated {} fault(s).", Integer.valueOf(Agent.this.deactivateRunningFaults()));
                        return;
                    }
                } catch (Throwable th6) {
                    Agent.log.info("AgentRunnable shutting down.");
                    Agent.this.restServer.stop();
                    Agent.log.info("AgentRunnable deactivated {} fault(s).", Integer.valueOf(Agent.this.deactivateRunningFaults()));
                    throw th6;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int deactivateRunningFaults() {
        long milliseconds = this.time.milliseconds();
        int i = 0;
        this.lock.lock();
        try {
            FaultSet.FaultSetIterator iterateByStart = this.runningFaults.iterateByStart();
            while (iterateByStart.hasNext()) {
                Fault next = iterateByStart.next();
                try {
                    try {
                        i++;
                        iterateByStart.remove();
                        next.deactivate(milliseconds, this.platform);
                        this.doneFaults.add(next);
                    } finally {
                    }
                } catch (Exception e) {
                    log.error("Got exception while deactivating {}", next, e);
                    this.doneFaults.add(next);
                }
            }
            return i;
        } finally {
            this.lock.unlock();
        }
    }

    public Agent(Platform platform, Time time, JsonRestServer jsonRestServer, AgentRestResource agentRestResource) {
        this.platform = platform;
        this.time = time;
        this.restServer = jsonRestServer;
        this.startTimeMs = time.milliseconds();
        this.thread.start();
        agentRestResource.setAgent(this);
    }

    public int port() {
        return this.restServer.port();
    }

    public void beginShutdown() {
        this.lock.lock();
        try {
            if (this.shutdown) {
                return;
            }
            this.shutdown = true;
            this.cond.signalAll();
            this.lock.unlock();
        } finally {
            this.lock.unlock();
        }
    }

    public void waitForShutdown() {
        try {
            this.thread.join();
        } catch (InterruptedException e) {
            log.error("Interrupted while waiting for thread shutdown", e);
            Thread.currentThread().interrupt();
        }
    }

    public long startTimeMs() {
        return this.startTimeMs;
    }

    public AgentFaultsResponse faults() {
        TreeMap treeMap = new TreeMap();
        this.lock.lock();
        try {
            updateFaultsResponse(treeMap, this.pendingFaults);
            updateFaultsResponse(treeMap, this.runningFaults);
            updateFaultsResponse(treeMap, this.doneFaults);
            this.lock.unlock();
            return new AgentFaultsResponse(treeMap);
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    private void updateFaultsResponse(Map<String, FaultDataMap.FaultData> map, FaultSet faultSet) {
        FaultSet.FaultSetIterator iterateByStart = faultSet.iterateByStart();
        while (iterateByStart.hasNext()) {
            Fault next = iterateByStart.next();
            map.put(next.id(), new FaultDataMap.FaultData(next.spec(), next.state()));
        }
    }

    public void createFault(CreateAgentFaultRequest createAgentFaultRequest) throws ClassNotFoundException {
        this.lock.lock();
        try {
            this.pendingFaults.add(FaultSpec.Util.createFault(createAgentFaultRequest.id(), createAgentFaultRequest.spec()));
            this.cond.signalAll();
            this.lock.unlock();
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    public static void main(String[] strArr) throws Exception {
        ArgumentParser description = ArgumentParsers.newArgumentParser("trogdor-agent").defaultHelp(true).description("The Trogdor fault injection agent");
        description.addArgument(new String[]{"--agent.config"}).action(Arguments.store()).required(true).type(String.class).dest("config").metavar(new String[]{"CONFIG"}).help("The configuration file to use.");
        description.addArgument(new String[]{"--node-name"}).action(Arguments.store()).required(true).type(String.class).dest("node_name").metavar(new String[]{"NODE_NAME"}).help("The name of this node.");
        Namespace namespace = null;
        try {
            namespace = description.parseArgs(strArr);
        } catch (ArgumentParserException e) {
            if (strArr.length == 0) {
                description.printHelp();
                Exit.exit(0);
            } else {
                description.handleError(e);
                Exit.exit(1);
            }
        }
        Platform parse = Platform.Config.parse(namespace.getString("node_name"), namespace.getString("config"));
        JsonRestServer jsonRestServer = new JsonRestServer(Node.Util.getTrogdorAgentPort(parse.curNode()));
        AgentRestResource agentRestResource = new AgentRestResource();
        Agent agent = new Agent(parse, Time.SYSTEM, jsonRestServer, agentRestResource);
        jsonRestServer.start(agentRestResource);
        Runtime.getRuntime().addShutdownHook(new Thread() { // from class: org.apache.kafka.trogdor.agent.Agent.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                Agent.log.error("Running shutdown hook...");
                Agent.this.beginShutdown();
                Agent.this.waitForShutdown();
            }
        });
        agent.waitForShutdown();
    }
}
