package org.apache.rya.streams.client.command;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.AbstractScheduledService;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import kafka.log.LogConfig;
import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
import org.apache.rya.streams.client.RyaStreamsCommand;
import org.apache.rya.streams.kafka.KafkaTopics;
import org.apache.rya.streams.kafka.interactor.KafkaRunQuery;
import org.apache.rya.streams.kafka.interactor.KafkaTopicPropertiesBuilder;
import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory;
import org.apache.rya.streams.kafka.topology.TopologyFactory;

@DefaultAnnotation({NonNull.class})
/* loaded from: input_file:org/apache/rya/streams/client/command/RunQueryCommand.class */
public class RunQueryCommand implements RyaStreamsCommand {

    /* loaded from: input_file:org/apache/rya/streams/client/command/RunQueryCommand$RunParameters.class */
    private class RunParameters extends RyaStreamsCommand.KafkaParameters {

        @Parameter(names = {"--queryID", "-q"}, required = true, description = "The ID of the query to run.")
        private String queryId;

        @Parameter(names = {"--zookeepers", "-z"}, required = true, description = "The servers that Zookeeper runs on.")
        private String zookeeperServers;

        private RunParameters() {
        }

        @Override // org.apache.rya.streams.client.RyaStreamsCommand.KafkaParameters
        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append(super.toString());
            if (!Strings.isNullOrEmpty(this.queryId)) {
                sb.append("\tQueryID: " + this.queryId);
                sb.append("\n");
            }
            return sb.toString();
        }
    }

    @Override // org.apache.rya.streams.client.RyaStreamsCommand
    public String getCommand() {
        return "run-query";
    }

    @Override // org.apache.rya.streams.client.RyaStreamsCommand
    public String getDescription() {
        return "Runs a Rya Streams query until the command is killed. This command also creates the input and output topics required to execute the query.";
    }

    @Override // org.apache.rya.streams.client.RyaStreamsCommand
    public String getUsage() {
        JCommander jCommander = new JCommander(new RunParameters());
        StringBuilder sb = new StringBuilder();
        jCommander.usage(sb);
        return sb.toString();
    }

    @Override // org.apache.rya.streams.client.RyaStreamsCommand
    public boolean validArguments(String[] strArr) {
        boolean z = true;
        try {
            new JCommander(new RunParameters(), strArr);
        } catch (ParameterException e) {
            z = false;
        }
        return z;
    }

    @Override // org.apache.rya.streams.client.RyaStreamsCommand
    public void execute(String[] strArr) throws RyaStreamsCommand.ArgumentsException, RyaStreamsCommand.ExecutionException {
        Objects.requireNonNull(strArr);
        RunParameters runParameters = new RunParameters();
        try {
            new JCommander(runParameters, strArr);
            InMemoryQueryRepository inMemoryQueryRepository = new InMemoryQueryRepository(KafkaQueryChangeLogFactory.make(runParameters.kafkaIP + ":" + runParameters.kafkaPort, KafkaTopics.queryChangeLogTopic(runParameters.ryaInstance)), AbstractScheduledService.Scheduler.newFixedRateSchedule(0L, 5L, TimeUnit.SECONDS));
            try {
                try {
                    UUID fromString = UUID.fromString(runParameters.queryId);
                    if (!inMemoryQueryRepository.get(fromString).isPresent()) {
                        throw new RyaStreamsCommand.ArgumentsException("There is no registered query for queryId " + runParameters.queryId);
                    }
                    HashSet hashSet = new HashSet();
                    hashSet.add(KafkaTopics.statementsTopic(runParameters.ryaInstance));
                    hashSet.add(KafkaTopics.queryResultsTopic(runParameters.ryaInstance, fromString));
                    KafkaTopics.createTopics(runParameters.zookeeperServers, hashSet, 1, 1, Optional.of(new KafkaTopicPropertiesBuilder().setCleanupPolicy(LogConfig.Compact()).build()));
                    new KafkaRunQuery(runParameters.kafkaIP, runParameters.kafkaPort, KafkaTopics.statementsTopic(runParameters.ryaInstance), KafkaTopics.queryResultsTopic(runParameters.ryaInstance, fromString), inMemoryQueryRepository, new TopologyFactory()).run(fromString);
                } catch (Exception e) {
                    throw new RyaStreamsCommand.ExecutionException("Could not execute the Run Query command.", e);
                }
            } catch (RyaStreamsCommand.ExecutionException e2) {
                throw e2;
            } catch (Exception e3) {
                throw new RyaStreamsCommand.ExecutionException("Problem encountered while closing the QueryRepository.", e3);
            }
        } catch (ParameterException e4) {
            throw new RyaStreamsCommand.ArgumentsException("Could not add a new query because of invalid command line parameters.", e4);
        }
    }
}
