package org.apache.edgent.apps.runtime;

import com.google.gson.JsonObject;
import java.util.concurrent.ExecutionException;
import org.apache.edgent.execution.DirectSubmitter;
import org.apache.edgent.execution.Job;
import org.apache.edgent.execution.mbeans.JobMXBean;
import org.apache.edgent.execution.services.ControlService;
import org.apache.edgent.execution.services.JobRegistryService;
import org.apache.edgent.execution.services.RuntimeServices;
import org.apache.edgent.function.BiFunction;
import org.apache.edgent.function.Consumer;
import org.apache.edgent.function.Predicate;
import org.apache.edgent.function.Supplier;
import org.apache.edgent.runtime.jobregistry.JobEvents;
import org.apache.edgent.topology.TStream;
import org.apache.edgent.topology.Topology;
import org.apache.edgent.topology.TopologyProvider;
import org.apache.edgent.topology.mbeans.ApplicationServiceMXBean;
import org.apache.edgent.topology.services.ApplicationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/edgent/apps/runtime/JobMonitorApp.class */
public class JobMonitorApp {
    public static final String APP_NAME = "edgentJobMonitorApp";
    private final TopologyProvider provider;
    private final DirectSubmitter<Topology, Job> submitter;
    private final Topology topology;
    private static final Logger logger = LoggerFactory.getLogger(JobMonitorApp.class);

    /* loaded from: input_file:org/apache/edgent/apps/runtime/JobMonitorApp$JobRestarter.class */
    public static class JobRestarter implements Consumer<JsonObject> {
        private static final long serialVersionUID = 1;
        private final Supplier<RuntimeServices> rts;

        JobRestarter(Supplier<RuntimeServices> supplier) {
            this.rts = supplier;
        }

        public void accept(JsonObject jsonObject) {
            ControlService controlService = (ControlService) ((RuntimeServices) this.rts.get()).getService(ControlService.class);
            String jobName = JobMonitorAppEvent.getJobName(JobMonitorAppEvent.getJob(jsonObject));
            JobMonitorApp.logger.trace("close and restart: {}", jsonObject);
            JobMonitorApp.closeJob(jobName, controlService);
            JobMonitorApp.submitApplication(jobName, controlService);
        }
    }

    public JobMonitorApp(TopologyProvider topologyProvider, DirectSubmitter<Topology, Job> directSubmitter, String str) {
        this.provider = topologyProvider;
        this.submitter = directSubmitter;
        validateSubmitter();
        this.topology = declareTopology(str);
    }

    public Job submit() throws InterruptedException, ExecutionException {
        return (Job) this.submitter.submit(this.topology).get();
    }

    public static void submitApplication(String str, ControlService controlService) {
        try {
            ApplicationServiceMXBean applicationServiceMXBean = (ApplicationServiceMXBean) controlService.getControl("appService", "edgent", ApplicationServiceMXBean.class);
            if (applicationServiceMXBean == null) {
                throw new IllegalStateException("Could not find a registered control with the following interface: " + ApplicationServiceMXBean.class.getName());
            }
            logger.info("Restarting monitored application {}", str);
            applicationServiceMXBean.submit(str, (String) null);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public static void closeJob(String str, ControlService controlService) {
        try {
            JobMXBean jobMXBean = (JobMXBean) controlService.getControl("job", str, JobMXBean.class);
            if (jobMXBean == null) {
                throw new IllegalStateException("Could not find a registered control for job " + str + " with the following interface: " + JobMXBean.class.getName());
            }
            jobMXBean.stateChange(Job.Action.CLOSE);
            logger.debug("Closing job {}", str);
            long currentTimeMillis = System.currentTimeMillis();
            for (long j = 10000; j >= 0 && jobMXBean.getCurrentState() != Job.State.CLOSED; j -= 100) {
                Thread.sleep(100L);
            }
            if (jobMXBean.getCurrentState() != Job.State.CLOSED) {
                throw new IllegalStateException("The unhealthy job " + str + " did not close after 10 seconds");
            }
            logger.debug("Job {} state is CLOSED after waiting for {} milliseconds", str, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    protected Topology declareTopology(String str) {
        Topology newTopology = this.provider.newTopology(str);
        declareTopology(newTopology);
        return newTopology;
    }

    public static void declareTopology(Topology topology) {
        BiFunction biFunction;
        Predicate predicate;
        biFunction = JobMonitorApp$$Lambda$1.instance;
        TStream source = JobEvents.source(topology, biFunction);
        predicate = JobMonitorApp$$Lambda$4.instance;
        source.filter(predicate).sink(new JobRestarter(topology.getRuntimeServiceSupplier()));
    }

    private void validateSubmitter() {
        if (((ControlService) this.submitter.getServices().getService(ControlService.class)) == null) {
            throw new IllegalArgumentException("Could not access service " + ControlService.class.getName());
        }
        if (((ApplicationService) this.submitter.getServices().getService(ApplicationService.class)) == null) {
            throw new IllegalArgumentException("Could not access service " + ApplicationService.class.getName());
        }
        if (((JobRegistryService) this.submitter.getServices().getService(JobRegistryService.class)) == null) {
            throw new IllegalArgumentException("Could not access service " + JobRegistryService.class.getName());
        }
    }

    public static /* synthetic */ boolean lambda$declareTopology$23eabd63$1(JsonObject jsonObject) {
        logger.trace("Filter: {}", jsonObject);
        try {
            JsonObject job = JobMonitorAppEvent.getJob(jsonObject);
            if (Job.Health.UNHEALTHY.name().equals(JobMonitorAppEvent.getJobHealth(job)) && Job.State.RUNNING.name().equals(JobMonitorAppEvent.getProperty(job, "state"))) {
                if (Job.State.RUNNING.name().equals(JobMonitorAppEvent.getProperty(job, "nextState"))) {
                    return true;
                }
            }
            return false;
        } catch (IllegalArgumentException e) {
            logger.info("Invalid event filtered out, cause: {}", e.getMessage());
            return false;
        }
    }

    public static /* synthetic */ JsonObject lambda$declareTopology$acfcd8ee$1(JobRegistryService.EventType eventType, Job job) {
        return JobMonitorAppEvent.toJsonObject(eventType, job);
    }
}
