package org.apache.rya.streams.querymanager;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.google.common.util.concurrent.AbstractScheduledService;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import javax.xml.bind.JAXBException;
import org.apache.commons.daemon.Daemon;
import org.apache.commons.daemon.DaemonContext;
import org.apache.commons.daemon.DaemonInitException;
import org.apache.rya.streams.kafka.SingleThreadKafkaStreamsFactory;
import org.apache.rya.streams.kafka.interactor.CreateKafkaTopic;
import org.apache.rya.streams.querymanager.kafka.KafkaQueryChangeLogSource;
import org.apache.rya.streams.querymanager.kafka.LocalQueryExecutor;
import org.apache.rya.streams.querymanager.xml.Kafka;
import org.apache.rya.streams.querymanager.xml.QueryManagerConfig;
import org.apache.rya.streams.querymanager.xml.QueryManagerConfigUnmarshaller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;

@DefaultAnnotation({NonNull.class})
/* loaded from: input_file:org/apache/rya/streams/querymanager/QueryManagerDaemon.class */
public class QueryManagerDaemon implements Daemon {
    private static final Logger log = LoggerFactory.getLogger(QueryManagerDaemon.class);
    private static final Path DEFAULT_CONFIGURATION_PATH = Paths.get("config/configuration.xml", new String[0]);
    private QueryManager manager = null;

    /* loaded from: input_file:org/apache/rya/streams/querymanager/QueryManagerDaemon$DaemonParameters.class */
    class DaemonParameters {

        @Parameter(names = {"--config", "-c"}, required = false, description = "The path to the application's configuration file.")
        public String config;

        DaemonParameters() {
        }
    }

    public void init(DaemonContext daemonContext) throws DaemonInitException, Exception {
        Objects.requireNonNull(daemonContext);
        String[] arguments = daemonContext.getArguments();
        DaemonParameters daemonParameters = new DaemonParameters();
        try {
            new JCommander(daemonParameters).parse(arguments);
            Path path = daemonParameters.config != null ? Paths.get(daemonParameters.config, new String[0]) : DEFAULT_CONFIGURATION_PATH;
            log.info("Loading the following configuration file: " + path);
            try {
                InputStream newInputStream = Files.newInputStream(path, new OpenOption[0]);
                Throwable th = null;
                try {
                    try {
                        QueryManagerConfig unmarshall = QueryManagerConfigUnmarshaller.unmarshall(newInputStream);
                        if (newInputStream != null) {
                            if (0 != 0) {
                                try {
                                    newInputStream.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                newInputStream.close();
                            }
                        }
                        QueryManagerConfig.PerformanceTunning.QueryChanngeLogDiscoveryPeriod queryChanngeLogDiscoveryPeriod = unmarshall.getPerformanceTunning().getQueryChanngeLogDiscoveryPeriod();
                        long longValue = queryChanngeLogDiscoveryPeriod.getValue().longValue();
                        TimeUnit valueOf = TimeUnit.valueOf(queryChanngeLogDiscoveryPeriod.getUnits().toString());
                        log.info("Query Change Log Polling Period: " + longValue + " " + valueOf);
                        AbstractScheduledService.Scheduler newFixedRateSchedule = AbstractScheduledService.Scheduler.newFixedRateSchedule(0L, longValue, valueOf);
                        Kafka kafka = unmarshall.getQueryChangeLogSource().getKafka();
                        log.info("Kafka Source: " + kafka.getHostname() + ":" + kafka.getPort());
                        this.manager = new QueryManager(new LocalQueryExecutor(new CreateKafkaTopic(unmarshall.getQueryExecutor().getLocalKafkaStreams().getZookeepers()), new SingleThreadKafkaStreamsFactory(kafka.getHostname() + ":" + kafka.getPort())), new KafkaQueryChangeLogSource(kafka.getHostname(), kafka.getPort(), newFixedRateSchedule), longValue, valueOf);
                    } finally {
                    }
                } finally {
                }
            } catch (JAXBException | SAXException e) {
                throw new DaemonInitException("Unable to marshall the configuration XML file: " + path, e);
            }
        } catch (ParameterException e2) {
            throw new DaemonInitException("Unable to parse the command line arguments.", e2);
        }
    }

    public void start() throws Exception {
        log.info("Starting the Rya Streams Query Manager Daemon.");
        this.manager.startAndWait();
    }

    public void stop() throws Exception {
        log.info("Stopping the Rya Streams Query Manager Daemon.");
        this.manager.stopAndWait();
    }

    public void destroy() {
    }
}
